Skip to content

feat: replace BundlePoller polling with SSE streaming#269

Open
Evalir wants to merge 4 commits intographite-base/269from
evalir/eop/sse-bundle-poller
Open

feat: replace BundlePoller polling with SSE streaming#269
Evalir wants to merge 4 commits intographite-base/269from
evalir/eop/sse-bundle-poller

Conversation

@Evalir
Copy link
Copy Markdown
Member

@Evalir Evalir commented Apr 23, 2026

Description

Mirrors #259 (tx-poller SSE) for bundles. Replaces the 1s polling loop in BundlePoller with an SSE subscription to /bundles/feed via BuilderTxCache::subscribe_bundles (newly exposed by the SDK, matched by the tx-pool-webservice endpoint).

The structure matches TxPoller exactly:

  • Initial full_fetch paginates through stream_bundles to seed the cache on startup.
  • subscribe opens the SSE stream and yields CachedBundles in real time.
  • On stream error / stream end, reconnects with exponential backoff (1s → 30s cap), racing the sleep against the block-env watcher so an env change wins and triggers a fresh full refetch immediately.
  • On block-env change, the full_fetch runs again to cover anything missed during disconnection.
  • NotOurSlot is logged at trace level (expected when the builder is not slot-permissioned) in both the full-fetch and subscribe paths — avoids spurious warn-level noise.

The public check_bundle_cache() wrapper is dropped; the integration test now uses BuilderTxCache::stream_bundles directly, matching the style of tx_poller_test.rs.

Related Issue

Stacked on #259.

Testing

  • make fmt passes
  • make clippy passes
  • make test passes
  • cargo doc --no-deps passes with -D warnings
  • cargo test --features test-utils --no-run builds all integration tests
  • Exercise against a live tx-pool-webservice to verify SSE reconnect and env-driven refetch behavior end-to-end

Copy link
Copy Markdown
Member Author

Evalir commented Apr 23, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@Evalir Evalir force-pushed the evalir/eop/sse-bundle-poller branch from b4f7193 to 8c472ae Compare April 23, 2026 21:45
Mirrors the TxPoller SSE change: subscribe to /bundles/feed via
BuilderTxCache::subscribe_bundles for real-time delivery of new bundles,
with an initial full_fetch on startup/block-env change and exponential
backoff reconnect on error or stream end. Drops the 1s polling loop.
@Evalir Evalir changed the base branch from evalir/eop/sse-tx-poller to graphite-base/269 April 27, 2026 11:33
@Evalir Evalir marked this pull request as ready for review April 27, 2026 11:34
Mirrors PR #259's 9bc0ff4 split applied to BundlePoller. Restore
check_bundle_cache as a private pure-fetch helper returning
Result<Vec<CachedBundle>, _>, and rename full_fetch to
fetch_and_forward — its name now matches what it does (fetch +
forward to the outbound channel). Use let-else over the fetch
result to drop a level of indentation.
@Evalir Evalir force-pushed the evalir/eop/sse-bundle-poller branch from 8c472ae to 58c25a1 Compare April 27, 2026 11:38
@Evalir Evalir changed the base branch from graphite-base/269 to evalir/eop/sse-tx-poller April 27, 2026 11:39
Copy link
Copy Markdown
Contributor

@dylanlott dylanlott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test these changes in Parmigiana asap before taking to mainnet.

Comment thread src/tasks/cache/bundle.rs
time::sleep(self.poll_duration()).await;
continue;
};
async fn task_future(mut self, outbound: mpsc::UnboundedSender<CachedBundle>) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should span this and pass the span to the lifecycle methods as necessary

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and as I think about it more, we probably want to add at least the block number to span's log fields.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4task_future now carries an #[instrument] span with url and block_number fields. block_number is recorded on startup and re-recorded on every env-change branch so the field tracks the current block. The lifecycle methods (fetch_and_forward, subscribe, reconnect, handle_sse_item) inherit this span automatically via async context propagation, so I dropped the redundant per-method trace_span!s.

Comment thread src/tasks/cache/bundle.rs

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is removed from this arm, but it's still called in the tx cache arm. Need to check outbound.is_closed somewhere in this arm. In its current state, the simulator could drop its receiver while the SSE is in the error or none branch and would repeat its reconnection loop forever.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4 — added an if outbound.is_closed() { break; } guard at the top of task_future's select loop, plus an early outbound.is_closed() check at the top of reconnect's loop body. Together those break the forever-reconnect cycle: if the simulator drops its receiver while we're stuck in a stream error/None loop, the next iteration of either loop exits cleanly.

Comment thread src/tasks/cache/bundle.rs Outdated
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to subscribe to bundles");
}
_ => warn!(%error, "Failed to open SSE bundle subscription"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add a metric for SSE failures in the subscribe loop to match the inc_tx_poll_errors() in the fetch and dispatch loop.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4 — added inc_sse_subscribe_errors and called it from subscribe's inspect_err. Used a generic name (no bundle_ prefix) to match the existing inc_sse_reconnect_attempts pattern; tx.rs can adopt the same helper later without renaming.

Comment thread src/tasks/cache/bundle.rs Outdated
outbound: &mpsc::UnboundedSender<CachedBundle>,
backoff: &mut Duration,
) -> SseStream {
tokio::select! {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add a metric like inc_tx_sse_reconnects() as well

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4BundlePoller::reconnect now increments the existing inc_sse_reconnect_attempts counter once per attempt, matching how TxPoller::reconnect uses it. Also generalized the metric's description from "SSE transaction stream reconnect attempts" to "SSE stream reconnect attempts" since both pollers share it.

Copy link
Copy Markdown
Contributor

@Fraser999 Fraser999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use actual integration tests for this stream and the tx stream (I think the existing one is more like a connectivity smoke test). Not blocking, but worth a follow-up ticket?

Comment thread src/tasks/cache/bundle.rs Outdated
Comment thread src/tasks/cache/bundle.rs
Comment thread src/tasks/cache/bundle.rs
@Evalir Evalir changed the base branch from evalir/eop/sse-tx-poller to graphite-base/269 April 28, 2026 15:54
Addresses dylanlott + Fraser999 comments on the BundlePoller SSE
PR:

- Wrap task_future with #[instrument], adding url + block_number
  fields to the parent span; record block_number on every env
  change so tracing reflects current state. (dylan)
- Guard the task_future select loop with outbound.is_closed() so a
  dropped consumer breaks the loop instead of letting reconnect
  retry forever. (dylan)
- Subscribe now returns Option<SseStream>; reconnect loops
  internally with backoff until it gets a real stream or the
  outbound closes (returning Option). Kills the misleading double
  warn ("Failed to open SSE bundle subscription" + "stream
  ended") on subscribe failure. (Fraser/Claude)
- Add inc_sse_subscribe_errors metric, called from subscribe's
  error path. Generic name (not bundle-specific) since the existing
  inc_sse_reconnect_attempts is also generic. (dylan)
- Increment inc_sse_reconnect_attempts in BundlePoller::reconnect,
  matching tx.rs. (dylan)
- Bump "Block env changed" log from trace to debug, matching the
  agreed change on tx.rs. (Fraser)
- Generalize SSE_RECONNECT_ATTEMPTS_HELP from "transaction stream"
  to just "SSE stream" since the metric is shared.

Defers (per plan): renaming "poll" metrics to "fetch" — that's a
cross-cutting metrics rename that should ride in its own PR.
@Evalir Evalir requested review from Fraser999 and dylanlott April 29, 2026 09:54
- Collapse the duplicated reconnect match arms in handle_sse_item
  into let-else, matching the project's terse Option/Result style.
- Promote record_block_number from a free fn to a &self method,
  shrinking the call sites and using SimEnv::rollup_block_number()
  instead of inlining the rollup_env().number.to::<u64>() chain.
- Drop a stray WHAT-comment on reconnect's loop tail.
Comment thread src/tasks/cache/bundle.rs
Comment on lines +168 to +171
#[instrument(
skip_all,
fields(url = %self.config.tx_pool_url, block_number = tracing::field::Empty),
)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a long-running task under normal conditions, so shouldn't be instrumented like this I believe. Maybe something more like how we do instrumentation in SubmitTask::task_future would work better? That would also avoid the call to Span::current() inside record_block_number which can be flaky depending on the runtime log level chosen.

Comment thread src/tasks/cache/bundle.rs
let (outbound, inbound) = unbounded_channel();
fn record_block_number(&self) {
if let Some(env) = self.envs.borrow().as_ref() {
Span::current().record("block_number", env.rollup_block_number());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we use sim.ru.number for this elsewhere rather than block_number.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants